[CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface#30254
[CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface#30254
Conversation
There was a problem hiding this comment.
Pull request overview
Refactors Schema Registry’s internal _schemas topic I/O to go through a new transport abstraction, enabling alternative backends (e.g., RPC-based) while keeping existing Kafka-client behavior available via kafka_client_transport.
Changes:
- Introduce
pandaproxy::schema_registry::transportand factor internal topic operations (produce/consume/HWM/auth/mitigation) behind it. - Add
kafka_client_transportimplementation and wireapi,service, andseq_writerto use the transport pointer. - Update tests to use a
noop_transportand add a collision-simulation transport to exercise delete retry behavior.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/pandaproxy/schema_registry/transport.h | Adds the new transport interface used by SR components. |
| src/v/pandaproxy/schema_registry/kafka_client_transport.h | Declares Kafka-client-backed transport implementation. |
| src/v/pandaproxy/schema_registry/kafka_client_transport.cc | Implements Kafka-client-backed transport behaviors. |
| src/v/pandaproxy/schema_registry/service.h | Switches SR service dependency from kafka client to transport. |
| src/v/pandaproxy/schema_registry/service.cc | Routes internal topic init/load through transport and adds retry logic. |
| src/v/pandaproxy/schema_registry/seq_writer.h | Switches sequencer dependency from kafka client to transport; adds delete retry cache state. |
| src/v/pandaproxy/schema_registry/seq_writer.cc | Uses transport for produce/consume/HWM and adds delete collision retry behavior. |
| src/v/pandaproxy/schema_registry/api.h | Replaces sharded Kafka client with sharded Kafka transport. |
| src/v/pandaproxy/schema_registry/api.cc | Wires up transport lifecycle and passes transport pointers to service/sequencer. |
| src/v/pandaproxy/schema_registry/fwd.h | Adds forward declarations for new transport types. |
| src/v/pandaproxy/schema_registry/BUILD | Adds new transport and kafka_client_transport Bazel targets and updates deps. |
| src/v/pandaproxy/schema_registry/test/utils.h | Adds noop_transport for tests that don’t use topic I/O. |
| src/v/pandaproxy/schema_registry/test/consume_to_store.cc | Updates tests to use noop_transport; adds collision transport test. |
| src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc | Updates test to use noop_transport instead of dummy Kafka client. |
| src/v/pandaproxy/schema_registry/test/BUILD | Updates test deps to include new transport targets. |
0e4a6d3 to
4502f20
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
src/v/pandaproxy/schema_registry/seq_writer.cc:620
_delete_versions_cacheis populated before several operations that can throw (e.g.,is_referenced, tombstone lookups). If an exception is thrown after caching, the cache persists beyond this delete attempt and a later call for the same subject (when it is already soft-deleted) can incorrectly return this stale cached version list. Consider clearing the cache on any exception path after it is set (e.g., a scope guard / try-catch that resets_delete_versions_cachebefore rethrowing), or delaying cache population until after all non-retriable validations have passed.
// Cache versions for potential retry — after a subject-level soft
// delete all versions are marked deleted and the pre-delete list
// cannot be reconstructed from the store. Tagged with subject so
// stale entries from a prior delete of a different subject are ignored.
_delete_versions_cache.emplace(delete_version_cache{sub, versions.copy()});
// Check that the subject is not referenced
if (co_await _store.is_referenced(sub, std::nullopt)) {
throw as_exception(has_references(sub, versions.back()));
}
4502f20 to
7c45444
Compare
7c45444 to
b73bcef
Compare
b73bcef to
23114c7
Compare
65cad38 to
01aa966
Compare
CI test resultstest results on build#83603
test results on build#83618
test results on build#83910
|
01aa966 to
709a3c7
Compare
|
@dotnwat @pgellert @nguyen-andrew - I think this one is pretty clean. if you want to see how it maps into the rpc stuff check out #30046 |
|
/ci-repeat 1 |
| constexpr auto max_backoff = 5000ms; | ||
| auto backoff = 100ms; | ||
| for (int attempts = 0;; ++attempts) { | ||
| auto fut = co_await ss::coroutine::as_future( |
There was a problem hiding this comment.
Are co-routine lambdas safe now? I recall an announcement around this but haven't dug in enough yet.
There was a problem hiding this comment.
this is the current state of play https://clang.llvm.org/extra/clang-tidy/checks/cppcoreguidelines/avoid-capturing-lambda-coroutines.html
tl;dr it's ok if you use explicit this param (or don't capture anything)
| // On retry after a write collision the subject may already be | ||
| // soft-deleted (by the winning writer). Return the version list | ||
| // including deleted, since after a subject-level soft delete all | ||
| // versions are marked deleted. | ||
| if (co_await _store.is_subject_deleted(sub)) { | ||
| co_return std::make_optional(std::move(versions)); | ||
| co_return co_await _store.get_versions(sub, include_deleted::yes); | ||
| } | ||
|
|
||
| // Grab the versions before they're gone. | ||
| auto versions = co_await _store.get_versions(sub, include_deleted::no); | ||
|
|
There was a problem hiding this comment.
The commit title for this says that this leads to a "retry crash". How does this lead to a crash exactly? I suspect that it leads to an infinite or at least slow retry chain inside sequenced_write.
I think this new behaviour is incompatible, because we'd also start returning [list-of-all-soft-deleted-subvers] when soft-deleting a subject that is already soft-deleted (regardless if there's a race or not) whereas we are supposed to throw a 404 in that case as far as I can tell.
In the race scenario you describe, I think we're still supposed to return a 404, and the bug is perhaps to do with the logic of sequenced_write. Maybe we should only retry on write_collision errors inside sequenced_write and propagate all other errors immediately.
Am I missing something here?
This is what I was using for testing the behaviour under the scenario you describe where we're soft deleting a subject with all-soft-deleted subject versions:
# Adjust to taste
SR=http://localhost:8081
SUBJECT=test-subject-value
CT='Content-Type: application/vnd.schemaregistry.v1+json'
# 1. Register schema v1 (Avro record with one field)
curl -sS -X POST "$SR/subjects/$SUBJECT/versions" \
-H "$CT" \
-d '{"schema":"{\"type\":\"record\",\"name\":\"Foo\",\"fields\":[{\"name\":\"a\",\"type\":\"string\"}]}"}'
echo
# 2. Register schema v2 (add a field — backwards-compatible default added)
curl -sS -X POST "$SR/subjects/$SUBJECT/versions" \
-H "$CT" \
-d '{"schema":"{\"type\":\"record\",\"name\":\"Foo\",\"fields\":[{\"name\":\"a\",\"type\":\"string\"},{\"name\":\"b\",\"type\":\"int\",\"default\":0}]}"}'
echo
# Sanity check — should list [1, 2]
curl -sS "$SR/subjects/$SUBJECT/versions"; echo
# 3. Soft-delete version 1
curl -sS -X DELETE "$SR/subjects/$SUBJECT/versions/1"; echo
# 4. Soft-delete version 2
curl -sS -X DELETE "$SR/subjects/$SUBJECT/versions/2"; echo
# Live versions list — expect 404 / empty (all soft-deleted)
curl -sS -i "$SR/subjects/$SUBJECT/versions"; echo
# Same list including soft-deleted — should still show [1, 2]
curl -sS "$SR/subjects/$SUBJECT/versions?deleted=true"; echo
# 5. The interesting call: soft-delete the subject when all versions
# are already soft-deleted. Use -i so you see the status code.
curl -sS -i -X DELETE "$SR/subjects/$SUBJECT"; echo
There was a problem hiding this comment.
Maybe we should only retry on write_collision errors inside sequenced_write
good idea
Am I missing something?
Don't think so. Added this after some test failure early on in refactoring, and I'm still not sure there was any faulty behavior at all. I kept it in on the off chance you or someone else would say "oh yes, of course that's a bug", but I think I'll just remove it. We can address in a follow up if it seems like there's a real improvement to be made.
709a3c7 to
e028da1
Compare
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
|
force push fix PR comments from @pgellert |
e028da1 to
57d80b7
Compare
|
force push rebase to fix merge conflicts. |
|
/cdt |
|
/cdt |
Introduces a pluggable
transportinterface for schema registry's internal_schemastopic I/O, replacing the hardcodedkafka::clientdependency withkafka_client_transport.Backports Required
Release Notes